{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-publish-and-run-using-rest-endpoint.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# How to Publish a Pipeline and Invoke the REST endpoint\n",
        "In this notebook, we will see how we can publish a pipeline and then invoke the REST endpoint."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Prerequisites and Azure Machine Learning Basics\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n",
        "\n",
        "### Initialization Steps"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import azureml.core\n",
        "from azureml.core import Workspace, Datastore, Experiment\n",
        "from azureml.core.compute import AmlCompute\n",
        "from azureml.core.compute import ComputeTarget\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)\n",
        "\n",
        "from azureml.data.data_reference import DataReference\n",
        "from azureml.pipeline.core import Pipeline, PipelineData\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "from azureml.pipeline.core.graph import PipelineParameter\n",
        "\n",
        "print(\"Pipeline SDK-specific imports completed\")\n",
        "\n",
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')\n",
        "\n",
        "# Default datastore (Azure blob storage)\n",
        "# def_blob_store = ws.get_default_datastore()\n",
        "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n",
        "print(\"Blobstore's name: {}\".format(def_blob_store.name))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Compute Targets\n",
        "#### Retrieve an already attached  Azure Machine Learning Compute"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute_target import ComputeTargetException\n",
        "\n",
        "aml_compute_target = \"cpu-cluster\"\n",
        "try:\n",
        "    aml_compute = AmlCompute(ws, aml_compute_target)\n",
        "    print(\"found existing compute target.\")\n",
        "except ComputeTargetException:\n",
        "    print(\"creating new compute target\")\n",
        "    \n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n",
        "                                                                min_nodes = 1, \n",
        "                                                                max_nodes = 4)    \n",
        "    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n",
        "    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# For a more detailed view of current Azure Machine Learning Compute status, use get_status()\n",
        "# example: un-comment the following line.\n",
        "# print(aml_compute.get_status().serialize())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Building Pipeline Steps with Inputs and Outputs\n",
        "As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Reference the data uploaded to blob storage using DataReference\n",
        "# Assign the datasource to blob_input_data variable\n",
        "blob_input_data = DataReference(\n",
        "    datastore=def_blob_store,\n",
        "    data_reference_name=\"test_data\",\n",
        "    path_on_datastore=\"20newsgroups/20news.pkl\")\n",
        "print(\"DataReference object created\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Define intermediate data using PipelineData\n",
        "processed_data1 = PipelineData(\"processed_data1\",datastore=def_blob_store)\n",
        "print(\"PipelineData object created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define a Step that consumes a datasource and produces intermediate data.\n",
        "In this step, we define a step that consumes a datasource and produces intermediate data.\n",
        "\n",
        "**Open `train.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** \n",
        "\n",
        "The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# trainStep consumes the datasource (Datareference) in the previous step\n",
        "# and produces processed_data1\n",
        "\n",
        "source_directory = \"publish_run_train\"\n",
        "\n",
        "trainStep = PythonScriptStep(\n",
        "    script_name=\"train.py\", \n",
        "        arguments=[\"--input_data\", blob_input_data, \"--output_train\", processed_data1],\n",
        "    inputs=[blob_input_data],\n",
        "    outputs=[processed_data1],\n",
        "    compute_target=aml_compute, \n",
        "    source_directory=source_directory\n",
        ")\n",
        "print(\"trainStep created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define a Step that consumes intermediate data and produces intermediate data\n",
        "In this step, we define a step that consumes an intermediate data and produces intermediate data.\n",
        "\n",
        "**Open `extract.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# extractStep to use the intermediate data produced by step4\n",
        "# This step also produces an output processed_data2\n",
        "processed_data2 = PipelineData(\"processed_data2\", datastore=def_blob_store)\n",
        "source_directory = \"publish_run_extract\"\n",
        "\n",
        "extractStep = PythonScriptStep(\n",
        "    script_name=\"extract.py\",\n",
        "    arguments=[\"--input_extract\", processed_data1, \"--output_extract\", processed_data2],\n",
        "    inputs=[processed_data1],\n",
        "    outputs=[processed_data2],\n",
        "    compute_target=aml_compute, \n",
        "    source_directory=source_directory)\n",
        "print(\"extractStep created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define a Step that consumes multiple intermediate data and produces intermediate data\n",
        "In this step, we define a step that consumes multiple intermediate data and produces intermediate data."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### PipelineParameter"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "This step also has a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.graph.pipelineparameter?view=azure-ml-py) argument that help with calling the REST endpoint of the published pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# We will use this later in publishing pipeline\n",
        "pipeline_param = PipelineParameter(name=\"pipeline_arg\", default_value=10)\n",
        "print(\"pipeline parameter created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "**Open `compare.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Now define step6 that takes two inputs (both intermediate data), and produce an output\n",
        "processed_data3 = PipelineData(\"processed_data3\", datastore=def_blob_store)\n",
        "source_directory = \"publish_run_compare\"\n",
        "\n",
        "compareStep = PythonScriptStep(\n",
        "    script_name=\"compare.py\",\n",
        "    arguments=[\"--compare_data1\", processed_data1, \"--compare_data2\", processed_data2, \"--output_compare\", processed_data3, \"--pipeline_param\", pipeline_param],\n",
        "    inputs=[processed_data1, processed_data2],\n",
        "    outputs=[processed_data3],    \n",
        "    compute_target=aml_compute, \n",
        "    source_directory=source_directory)\n",
        "print(\"compareStep created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Build the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline1 = Pipeline(workspace=ws, steps=[compareStep])\n",
        "print (\"Pipeline is built\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Run published pipeline\n",
        "### Publish the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline1 = pipeline1.publish(name=\"My_New_Pipeline\", description=\"My Published Pipeline Description\", continue_on_step_failure=True)\n",
        "published_pipeline1"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Note: the continue_on_step_failure parameter specifies whether the execution of steps in the Pipeline will continue if one step fails. The default value is False, meaning when one step fails, the Pipeline execution will stop, canceling any running steps."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Publish the pipeline from a submitted PipelineRun\n",
        "It is also possible to publish a pipeline from a submitted PipelineRun"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# submit a pipeline run\n",
        "pipeline_run1 = Experiment(ws, 'Pipeline_experiment').submit(pipeline1)\n",
        "# publish a pipeline from the submitted pipeline run\n",
        "published_pipeline2 = pipeline_run1.publish_pipeline(name=\"My_New_Pipeline2\", description=\"My Published Pipeline Description\", version=\"0.1\", continue_on_step_failure=True)\n",
        "published_pipeline2"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Get published pipeline\n",
        "\n",
        "You can get the published pipeline using **pipeline id**.\n",
        "\n",
        "To get all the published pipelines for a given workspace(ws): \n",
        "```css\n",
        "all_pub_pipelines = PublishedPipeline.get_all(ws)\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PublishedPipeline\n",
        "\n",
        "pipeline_id = published_pipeline1.id # use your published pipeline id\n",
        "published_pipeline = PublishedPipeline.get(ws, pipeline_id)\n",
        "published_pipeline"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Run published pipeline using its REST endpoint\n",
        "[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.authentication import InteractiveLoginAuthentication\n",
        "import requests\n",
        "\n",
        "auth = InteractiveLoginAuthentication()\n",
        "aad_token = auth.get_authentication_header()\n",
        "\n",
        "rest_endpoint1 = published_pipeline.endpoint\n",
        "\n",
        "print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint1))\n",
        "\n",
        "# specify the param when running the pipeline\n",
        "response = requests.post(rest_endpoint1, \n",
        "                         headers=aad_token, \n",
        "                         json={\"ExperimentName\": \"My_Pipeline1\",\n",
        "                               \"RunSource\": \"SDK\",\n",
        "                               \"ParameterAssignments\": {\"pipeline_arg\": 45}})"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "try:\n",
        "    response.raise_for_status()\n",
        "except Exception:    \n",
        "    raise Exception('Received bad response from the endpoint: {}\\n'\n",
        "                    'Response Code: {}\\n'\n",
        "                    'Headers: {}\\n'\n",
        "                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n",
        "\n",
        "run_id = response.json().get('Id')\n",
        "print('Submitted pipeline run: ', run_id)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Next: Data Transfer\n",
        "The next [notebook](https://aka.ms/pl-data-trans) will showcase data transfer steps between different types of data stores."
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "How to Publish a Pipeline and Invoke the REST endpoint",
    "kernelspec": {
      "display_name": "Python 3.6",
      "language": "python",
      "name": "python36"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "order_index": 3,
    "star_tag": [
      "featured"
    ],
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of Published Pipelines"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}